package com.tcs.gather.model.kafka;

import java.util.Map.Entry;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tcs.gather.model.CommandModel;
import com.tcs.util.constant.SystemConstants;

/**
* @Title: KafkaCommandModel.java
* @Package com.tcs.gather.model.kafka
* @author 神经刀
* @date 2018年3月26日
* @version V1.0
 */
public class KafkaCommandModel extends CommandModel {
	
	private final Logger logger = LoggerFactory.getLogger(KafkaCommandModel.class);

	private static final long serialVersionUID = 1L;
	
	protected String bootstrapServers;	// 集群
	
	protected String acks;	// 确认
	
	protected String retries;	// 重试次数
		
	protected String batchSize;	// 一批的大小
	
	protected String lingerMs;	// 巡回的时间
	
	protected String bufferMemory;	// 缓存
	
	protected String keySerializer;	// 序列化
	
	protected String valueSerializer;	// 序列化
	
	protected String groupId;		// 组ID
	
	protected String enableAutoCommit;		// 如果为true，消费者的offset将在后台周期性的提交
	
	protected String autoCommitIntervalMs;	// 消费者offset提交到zookeeper的频率（以毫秒为单位）
	
	protected Properties producerProperties;		// 生产者配置
	
	protected Properties consumerProperties;		// 消费者配置

	@Override
	public void checkParams() throws NullPointerException {
		producerProperties = new Properties();		// 生产者配置
		consumerProperties = new Properties();		// 消费者配置
		if (properties.getProperty(SystemConstants.GatherConstants.INPUTTYPE).equals(SystemConstants.GatherConstants.AKFKA)) {
			for (Entry<Object, Object> entry : this.properties.entrySet()) {
				for (String key : SystemConstants.GatherConstants.ConsumerArray) {
					if (entry.getKey().toString().contains(key)) {
						try {
							consumerProperties.put(key, entry.getValue());
						} catch (Exception e) {
							throw new NullPointerException("key : " + key + "配置错误!");
						}
					} 
				}
			}
		} else if (properties.getProperty(SystemConstants.GatherConstants.OUTPUTTYPE).equals(SystemConstants.GatherConstants.AKFKA)) {
			for (Entry<Object, Object> entry : this.properties.entrySet()) {
				for (String key : SystemConstants.GatherConstants.producerArray) {
					if (entry.getKey().toString().contains(key)) {
						try {
							producerProperties.put(key, String.valueOf(entry.getValue()));
						} catch (Exception e) {
							throw new NullPointerException("key : " + key + "配置错误!");
						}
					}
				}
			}
		}
	}
	
	public KafkaCommandModel() {};
	
	public KafkaCommandModel(Properties properties) {
		super(properties);
	}

	public Properties getProducerProperties() {
		return producerProperties;
	}

	public void setProducerProperties(Properties producerProperties) {
		this.producerProperties = producerProperties;
	}

	public Properties getConsumerProperties() {
		return consumerProperties;
	}

	public void setConsumerProperties(Properties consumerProperties) {
		this.consumerProperties = consumerProperties;
	}
}